基于 Binlog 实时采集数据到 Hive 在滴滴的实践
The following article is from 滴滴技术 Author 滴滴技术团队
大数据是这个时代赋予我们的强大引擎,在数字化大潮中 ,借助数据驱动的方法推动业务乘风破浪,几乎是每家公司的核心战略。数据驱动的落脚点是数据,能否将组织或业务运行过程中的信息,进行有效收集并组织成信息流,是数据驱动的基石所在。本文分享了滴滴数据体系建设过程中,MySQL这一类数据源的采集架构和应用实践。
1.
背景
关系模型构建起整个数据分析的基石,关系型数据库作为具体实现、采集MySQL数据接入Hive是很多企业进行数据分析的前提。如何及时、准确的把MySQL数据同步到Hive呢?
一般解决方案是使用类似Sqoop的工具,直连MySQL去Select数据存储到HDFS,然后把HDFS数据Load到Hive中。这种方法简单易操作,但随着业务规模扩大,不足之处也逐步暴露出来:
直连MySQL查询,对于数据库压力较大(如订单表、支付表等),可能直接影响在线业务
数据整体就位时间(尤其大表)不满足下游生产需求
扩展性较差,对于分表、字段增减、变更等的支持较弱
拉取的数据是该时刻的镜像,无法获取中间变化情况
为解决上述问题,我们引入Binlog实时采集 + 离线还原的解决方案,本文将从这两个方面介绍整个数据的接入流程。
按照上述流程采集binlog日志增量入HDFS
使用离线一次性拉取一份历史全量数据,按字段还原到Hive作为基点(即第一个接入周期的数据)
使用前一个接入周期的全量数据和本周期的增量binlog做merge形成该周期内的数据。
相比一般解决方案,其优点比较明显,主要表现在:
基于Binlog日志的数据还原,与在线业务解耦
采集通过分布式队列实时传递,还原操作在集群上实现,及时性及可扩展性强
Binlog日志包括了增、删、改等明细动作,支持定制化的ETL
Statement模式:每一条会修改数据的sql都会被记录在binlog中,如inserts, updates, deletes。
Row模式: 每一行的具体变更事件都会被记录在binlog中。
Canal主要运作方式如下:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向master发送dump协议
mysql master收到dump请求,开始推送binary log到canal
canal解析binary log对象,并将解析的结果编码成JSON格式的文本串
把解析后的文本串发送到消息队列并上报发送情况(如Kafka、DDMQ)
格式化后的单条记录新增消息示例如下:
{
"binlog": "25521@mysql-bin.000070",
"time": 1450236307000,
"canalTime": 1450236308279,
"db": "TestCanal",
"table": "g_order_010",
"event": "u",
"columns": [
{"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false},
{"n": "driver_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false},
{ "n": "passenger_id", "t": "bigint(20)", "v": "654321", "null": false, "updated": false},
{"n": "current_lng", "t": "decimal(10,6)", "v": "39.021400", "null": false, "updated": false},
{"n": "current_lat", "t": "decimal(10,6)", "v": "120.423300", "null": false, "updated": false},
{ "n": "starting_lng", "t": "decimal(10,6)", "v": "38.128000", "null": false, "updated": false},
{ "n": "starting_lat", "t": "decimal(10,6)", "v": "121.445000", "null": false, "updated": false},
{ "n": "dest_name", "t": "varchar(100)", "v": "Renmin University", "origin_val": "知春路", "null": false, "updated": true}
],
"keys": ["order_id"]
}{
"binlog": "25521@mysql-bin.000070",
"time": 1450236307000,
"canalTime": 1450236308279,
"db": "TestCanal",
"table": "g_order_010",
"event": "u",
"columns": [
{"n": "order_id", "t": "bigint(20)", "v": "126", "null": false, "updated": false},
{"n": "driver_id", "t": "bigint(20)", "v": "123456", "null": false, "updated": false},
{ "n": "passenger_id", "t": "bigint(20)", "v": "654321", "null": false, "updated": false},
{"n": "current_lng", "t": "decimal(10,6)", "v": "39.021400", "null": false, "updated": false},
{"n": "current_lat", "t": "decimal(10,6)", "v": "120.423300", "null": false, "updated": false},
{ "n": "starting_lng", "t": "decimal(10,6)", "v": "38.128000", "null": false, "updated": false},
{ "n": "starting_lat", "t": "decimal(10,6)", "v": "121.445000", "null": false, "updated": false},
{ "n": "dest_name", "t": "varchar(100)", "v": "Renmin University", "origin_val": "知春路", "null": false, "updated": true}
],
"keys": ["order_id"]
}
为保障整个Binlog链路中数据完整性,我们引入了Dquality服务。Dquality是数据通道中非常重要的一个环节,记录着整个数据通道每一个流程的数据信息,如某一段时间内的数据总和等。Dquality主要包含以下功能:
为数据回溯提供元数据支持
校验数据丢失与延迟情况
校验数据完整性
简单流程为数据链路上的各发送方在成功传递数据后,把投递结果以及时间信息发送到Dquality,Dquality统一汇总,分析判定每个时间段内数据是否完成及时准确传输,并把分析结果存储下来。下游数据使用方通过接口从Dquality查询该结果。
以Binlog链路为例,在Binlog流程中有两个环节Canal->MQ、MQ->HDFS,上报数据发送情况到Dquality。下游ETL环节使用Dquality接口查询数据就位情况,比如对于小时粒度任务,查询该小时的0分0秒到59分59秒之间的数据是否已经完成写入,如果已经完成写入,那么ETL任务就可以启动执行。
基于此,天或小时采集周期内的数据是固定的(幂等),以该时间段内的数据作为清洗基础,无论什么时候执行其结果不会改变。但在Canal上报环节,目前无法有效判定较小数据量场景和同步异常场景,一定程度上影响数据就位时间。
5.
一次性拉取&初始化
目前实现方式为通过DataX工具直连MySQL离线库,拉取一份截至到当前时间的全量数据,然后按列还原到Hive表的首个分区中。
全量采集场景下,下个分区的数据基于上个分区的数据和当前周期内的增量Binlog日志merge,即可产生该分区内的数据。
上面介绍了基于Binlog数据接入的整体流程,下面列举两个实际解决的业务问题。
6.
场景一:数据飘移的支持
case 1:订单的Binlog日志中,当订单事件的更新时间在59分59秒左右时,数据有可能会落在下一个小时的分区,以至于当前小时数据没有统计到该条订单,同时下一个小时分区的数据也没有打上相应的事件标签。
case 2:支付结算系统,当天所有交易记录会在次日凌晨后结算完成,按照默认采集逻辑,当天的记录落在次日的变更内,无法有效支持当天核算。
以上两个case的常规解决方案可能是把下个小时的数据也囊括到本采集周期内,但会导致数据就位时间延迟一个小时,扩散到数据下游,时间会更长,可能不满足实际需求。采集平台提供数据漂移的功能,即按需配置偏移量。比如小时粒度默认为00:00 - 59:59之间的数据,配置5min的偏移,那么数据区间为00:00 - 04:59(次小时),多出来的部分可以有效解决数据漂移功能,同时为及时性提供了有效支撑。
该功能在专快订单、财务应付应收以及国际化部分都有应用。但需要注意的是,下个采集周期内也包含了这部分实际发生在该区间内的数据。
7.
场景二:分库分表的支持
1. 统一MySQL使用规范,明确分库分表的命名规则,做到规则内自动化识别,同时完成全量元数据信息的收集,非规范化的命名规则无法自动化支持。
2. 默认情况下一个库的数据会收集到一个topic内,如果有分库存在也可以一并收集到一个topic内,保证逻辑上分库分表的数据物理上收集到一起。
3. 按照/{db}/{table}/{year}/{month}/{day}/{hour}的路径结构(其中日期由Binlog时间格式化生成)落地到HDFS上,一个逻辑表的数据存储在一起。
4. ETL处理阶段,取出上述路径下的Binlog日志,还原到Hive中。
为用户更好使用分库分表数据以及获取中间变化过程,ETL阶段额外再Hive表中写入三个字段:
system_rule_etl_update_field | 记录更新时间,更新晚的对应该字段的值更大,前十位是时间戳信息 |
system_rule_etl_delete_flag | 标识本条记录是否在上游数据库中被删除,0-正常记录,1-删除记录 |
system_rule_etl_uniq_key | 全局主键,由mysql库名+表名+主键拼接而成 |
8.
总结
本文作者
- EOF -
看完本文有收获?请转发分享给更多人
关注「大数据与机器学习文摘」,成为Top 1%
点赞和在看就是最大的支持❤️